/*
 * Copyright 2002-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.http.client.reactive;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;
import reactor.netty.NettyInbound;
import reactor.netty.http.client.HttpClientResponse;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

/**
 * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
 *
 * @author Brian Clozel
 * @author Rossen Stoyanchev
 * @see reactor.netty.http.client.HttpClient
 * @since 5.0
 */
class ReactorClientHttpResponse implements ClientHttpResponse {

    private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class);

    private final HttpClientResponse response;

    private final HttpHeaders headers;

    private final NettyInbound inbound;

    private final NettyDataBufferFactory bufferFactory;

    // 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
    private final AtomicInteger state = new AtomicInteger();

    private final String logPrefix;


    /**
     * Constructor that matches the inputs from
     * {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}.
     *
     * @since 5.2.8
     */
    public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
        this.response = response;
        MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders());
        this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
        this.inbound = connection.inbound();
        this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
        this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : "");
    }

    /**
     * Constructor with inputs extracted from a {@link Connection}.
     *
     * @deprecated as of 5.2.8, in favor of {@link #ReactorClientHttpResponse(HttpClientResponse, Connection)}
     */
    @Deprecated
    public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
        this.response = response;
        MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders());
        this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
        this.inbound = inbound;
        this.bufferFactory = new NettyDataBufferFactory(alloc);
        this.logPrefix = "";
    }


    @Override
    public Flux<DataBuffer> getBody() {
        return this.inbound.receive()
                .doOnSubscribe(s -> {
                    if (this.state.compareAndSet(0, 1)) {
                        return;
                    }
                    if (this.state.get() == 2) {
                        throw new IllegalStateException(
                                "The client response body has been released already due to cancellation.");
                    }
                })
                .map(byteBuf -> {
                    byteBuf.retain();
                    return this.bufferFactory.wrap(byteBuf);
                });
    }

    @Override
    public HttpHeaders getHeaders() {
        return this.headers;
    }

    @Override
    public HttpStatus getStatusCode() {
        return HttpStatus.valueOf(getRawStatusCode());
    }

    @Override
    public int getRawStatusCode() {
        return this.response.status().code();
    }

    @Override
    public MultiValueMap<String, ResponseCookie> getCookies() {
        MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
        this.response.cookies().values().stream()
                .flatMap(Collection::stream)
                .forEach(cookie -> result.add(cookie.name(),
                        ResponseCookie.fromClientResponse(cookie.name(), cookie.value())
                                .domain(cookie.domain())
                                .path(cookie.path())
                                .maxAge(cookie.maxAge())
                                .secure(cookie.isSecure())
                                .httpOnly(cookie.isHttpOnly())
                                .sameSite(getSameSite(cookie))
                                .build()));
        return CollectionUtils.unmodifiableMultiValueMap(result);
    }

    @Nullable
    private static String getSameSite(Cookie cookie) {
        if (cookie instanceof DefaultCookie) {
            DefaultCookie defaultCookie = (DefaultCookie) cookie;
            if (defaultCookie.sameSite() != null) {
                return defaultCookie.sameSite().name();
            }
        }
        return null;
    }

    /**
     * Called by {@link ReactorClientHttpConnector} when a cancellation is detected
     * but the content has not been subscribed to. If the subscription never
     * materializes then the content will remain not drained. Or it could still
     * materialize if the cancellation happened very early, or the response
     * reading was delayed for some reason.
     */
    void releaseAfterCancel(HttpMethod method) {
        if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) {
            if (logger.isDebugEnabled()) {
                logger.debug(this.logPrefix + "Releasing body, not yet subscribed.");
            }
            this.inbound.receive().doOnNext(byteBuf -> {
            }).subscribe(byteBuf -> {
            }, ex -> {
            });
        }
    }

    private boolean mayHaveBody(HttpMethod method) {
        int code = this.getRawStatusCode();
        return !((code >= 100 && code < 200) || code == 204 || code == 205 ||
                method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0);
    }

    @Override
    public String toString() {
        return "ReactorClientHttpResponse{" +
                "request=[" + this.response.method().name() + " " + this.response.uri() + "]," +
                "status=" + getRawStatusCode() + '}';
    }

}
